Exploratory notebook related to Generative Adversarial Networks (GANs). Includes toy examples implementation and testing of related techniques or subjects.
Architecture that learns by posing two networks in competition with each others. Goal is to learn parameters in order to produce a distribution close to our dataset distribution (true distribution).
In [ ]:
    
import time
from PIL import Image
import numpy as np
import pdb
import os
import sys
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import animation
from keras.models import Sequential
from keras.layers.core import Reshape, Dense, Dropout, Activation, Flatten
from keras.layers.convolutional import Convolution2D, MaxPooling2D, ZeroPadding2D, UpSampling2D
from keras.layers.advanced_activations import LeakyReLU
from keras import backend as K
from keras import optimizers
from keras.layers.normalization import BatchNormalization
from keras.datasets import mnist
from tqdm import tqdm_notebook as tqdm
%matplotlib notebook
sns.set_style("dark")
sys.path.append(os.path.join(os.getcwd(), os.pardir))
from utils.plot_utils import plot_sample_imgs
from utils.generative_utils import NoiseDistribution, set_trainable
RES_DIR = os.path.join(*[os.pardir]*2, 'data', 'deep_learning')
%load_ext autoreload
%autoreload 2
    
Example adapted from Aylien blog.
Check also here for Keras code
In [ ]:
    
# target 1D gaussian distribution class
class GaussianDistribution:
    def __init__(self, mu=4, sigma=0.5):
        self.mu = mu
        self.sigma = sigma
    
    def sample(self, N):
        samples = np.random.normal(self.mu, self.sigma, N)
        samples.sort()
        return samples
    
In [ ]:
    
# generator input noise distribution class
class GeneratorNoiseDistribution:
    def __init__(self, vrange):
        self.vrange = vrange
        
    def sample(self, N):
        return np.linspace(-self.vrange, self.vrange, N) + \
                    np.random.random(N) * 0.01
    
In [ ]:
    
def generator(input_dim, hidden_size):
    g = Sequential()
    g.add(Dense(hidden_size, input_dim=input_dim, activation=K.softplus))
    g.add(Dense(input_dim))
    return g
    
In [ ]:
    
def discriminator(input_dim, hidden_size):
    d = Sequential()
    d.add(Dense(hidden_size*2, input_dim=input_dim, activation=K.tanh))
    d.add(Dense(hidden_size*2, activation=K.tanh))
    d.add(Dense(hidden_size*2, activation=K.tanh))
    d.add(Dense(1, activation=K.sigmoid))
    return d
    
In [ ]:
    
# init distributions
gaussian_d = GaussianDistribution()
generator_d = GeneratorNoiseDistribution(8)
    
In [ ]:
    
# init GAN components
d = discriminator(1, 128)
g = generator(1, 128)
    
In [ ]:
    
# discriminator model
optimizer = optimizers.RMSprop(lr=0.0008, clipvalue=1.0, decay=6e-8)
discriminator_model = d
discriminator_model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    
In [ ]:
    
# adversarial model
optimizer = optimizers.RMSprop(lr=0.0004, clipvalue=1.0, decay=3e-8)
adversarial_model = Sequential()
adversarial_model.add(g)
adversarial_model.add(d)
adversarial_model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    
In [ ]:
    
batch_size = 64
fig, ax = plt.subplots(dpi=100, figsize=(5, 4))
true_dist = np.reshape(gaussian_d.sample(1000), (1000, 1))
plt.show()
def animate(step):
#for step in range(100):
    
    # generate data
    # first we sample from the true distribution, then we generate some
    # "fake" data by feeding noise to the generator
    true_sample = np.reshape(gaussian_d.sample(batch_size), (batch_size, 1))
    noise = generator_d.sample(batch_size)
    fake_sample = g.predict(noise)
    #pdb.set_trace()
    
    # train discriminator
    # feed true and fake samples with respective labels (1, 0) to the discriminator
    x = np.reshape(np.concatenate((true_sample, fake_sample)), (batch_size*2, 1))
    y = np.ones([batch_size*2, 1])
    y[batch_size:, :] = 0
    d_loss = discriminator_model.train_on_batch(x, y)
    
    # train GAN
    # feed noise to the model and expect true (1) response from discriminator,
    # which is in turn fed with data generated by the generator
    noise = np.reshape(generator_d.sample(batch_size), (batch_size, 1))
    y = np.ones([batch_size, 1])
    a_loss = adversarial_model.train_on_batch(noise, y)
    
    log_mesg = "%d: [D loss: %f, acc: %f]" % (step, d_loss[0], d_loss[1])
    log_mesg = "%s  [A loss: %f, acc: %f]" % (log_mesg, a_loss[0], a_loss[1])
    
    # plot
    fig.clf()
    fake = sns.distplot(fake_sample)
    fake.set_xlim([0,8])
    fake.set_ylim([0,3])
    sns.distplot(true_dist)
    sns.plt.text(3, 2, "Epoch {}, a_loss {:.3f}".format(step, a_loss[0]))
    
anim = animation.FuncAnimation(fig, animate, 200, repeat=False)
    
In [ ]:
    
noise = generator_d.sample(batch_size)
fake_sample = g.predict(noise)
    
In [ ]:
    
sns.distplot(fake_sample)
sns.plt.show()
    
Example adapted from MNIST Generative Adversarial Model in Keras
In [ ]:
    
noise_d = NoiseDistribution()
    
In [ ]:
    
input_dim = 100
img_shape = (28,28,1)
    
In [ ]:
    
# model takes real values vector of size input_dim and via upsampling,
# reshaping, and various convolutional filters generates a 28x28 b/w image
def generator_model(input_dim, n_channels=128, init_side=7):
    m = Sequential()
    m.add(Dense(init_side*init_side*n_channels, input_dim=input_dim, activation=LeakyReLU()))
    m.add(BatchNormalization(mode=2))
    m.add(Reshape((init_side, init_side, n_channels)))
    
    m.add(UpSampling2D())
    m.add(Convolution2D(n_channels//2, 3, 3, border_mode='same', activation=LeakyReLU()))
    m.add(BatchNormalization(mode=2))
    m.add(UpSampling2D())
    m.add(Convolution2D(n_channels//4, 3, 3, border_mode='same', activation=LeakyReLU()))
    m.add(BatchNormalization(mode=2))
    #?? Tanh 
    m.add(Convolution2D(1, 1, 1, border_mode='same', activation='sigmoid'))
    return m
    
In [ ]:
    
g = generator_model(input_dim=input_dim, n_channels=512)
g.summary()
    
In [ ]:
    
# plot random generated image
plt.imshow(g.predict(noise_d.sample((1, input_dim)))[0]
           .reshape(28, 28))
plt.show()
    
In [ ]:
    
# model takes image and after convolution and flattening
# outputs a probability value
def discriminator_model(input_shape, init_filters=64):
    m = Sequential()
    m.add(Convolution2D(init_filters, 5, 5, subsample=(2, 2), input_shape=input_shape, border_mode='same',
                       activation=LeakyReLU(0.2)))
    #?? maxpooling and dropout? MaxPool2D(pool_size=2)
    m.add(Convolution2D(init_filters*2, 5, 5, subsample=(2, 2), border_mode='same',
                       activation=LeakyReLU(0.2)))
    #m.add(Convolution2D(init_filters*4, 3, 5, border_mode='same', 
    #                    activation=LeakyReLU(0.2)))
    m.add(Flatten())
    m.add(Dense(256, activation=LeakyReLU()))
    m.add(Dense(1, activation='sigmoid'))
    return m
    
In [ ]:
    
d = discriminator_model(input_shape=(28,28,1), init_filters=256)
d.summary()
    
In [ ]:
    
# print prediction for random image
d.predict(g.predict(noise_d.sample((1, input_dim))))
    
In [ ]:
    
# init GAN components
g = generator_model(input_dim)
d = discriminator_model(img_shape)
# compile generator
#g_optimizer = optimizers.Adam(lr=0.0001)
#g.compile(loss='binary_crossentropy', optimizer=g_optimizer)
# compile discriminator
d_optimizer = optimizers.Adam(lr=0.001)
d.compile(loss='binary_crossentropy', optimizer=d_optimizer)
    
In [ ]:
    
# build adversarial model
gan = Sequential()
gan.add(g)
gan.add(d)
gan_optimizer = optimizers.Adam(lr=0.0001)
gan.compile(loss='binary_crossentropy', optimizer=gan_optimizer)
    
In [ ]:
    
gan.summary()
    
In [ ]:
    
generator_fun = lambda num_samples : generator.predict(noise_d.sample((num_samples, input_dim)))
    
In [ ]:
    
# load mnist data using Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# reshape and normalize train data
X_train = np.expand_dims(X_train, axis=-1)
X_train = X_train.astype('float32')/255
print(X_train.shape)
print(y_train.shape)
    
In [ ]:
    
def train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch):
    # generate data
    # first we sample from the true distribution (mnist dataset), then we generate some
    # "fake" images by feeding noise to the generator
    
    # generate random indexes for train data
    batch_idxs = np.random.randint(0, len(X_train), batch_size)
    # collect images corresponsing to previously generated index, and add a dimension
    true_sample = X_train[batch_idxs,:,:,:]
    # generate fake sample
    fake_sample = g.predict(noise_d.sample((batch_size, input_dim)))
    
    # prepare train batch data
    # concatenativ true and fake samples and adjusting labels accordingly
    x = np.concatenate((true_sample, fake_sample))
    y = np.ones([batch_size*2, 1])
    y[batch_size:,:] = 0
    
    # train discriminator
    # feed true and fake samples with respective labels (1, 0) to the discriminator
    set_trainable(d, True, None, None)
    d_loss = d.train_on_batch(x, y)
    #print("Epoch {}: [D loss: {}]".format(epoch, d_loss))
    return d_loss
    
In [ ]:
    
def train(d, g, gan, noise_d, input_dim, X_train, batch_size=32, n_epochs=1, add_epoch=0):
    losses = {'g':[], 'd':[]}
    for epoch in range(n_epochs):
        # train discriminator
        d_loss = train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch)
        losses['d'].append(d_loss)
        set_trainable(d, False, None, None)
        # train GAN
        # feed noise to the model and expect true (1) response from discriminator,
        # which is in turn fed with data generated by the generator
        noise = noise_d.sample((batch_size, input_dim))
        y = np.ones([batch_size, 1])
        g_loss = gan.train_on_batch(noise, y)
        losses['g'].append(g_loss)
        #print("Epoch {}: [G loss: {}]".format(epoch, g_loss))
        if (epoch%10)==0:
            plot_sample_imgs(generator_fun, img_shape[:2], savepath=os.path.join('data', 'mnist_gan', 'mnist_gen{}.jpg'.format(epoch+add_epoch)))
    return losses
    
In [ ]:
    
# pretrain discriminator
batch_size = 128
n_epochs = 1
for epoch in range(n_epochs):
    train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch)
    
In [ ]:
    
#plt.ion()
plt.ioff()
K.set_value(d.optimizer.lr, 1e-3)
K.set_value(gan.optimizer.lr, 1e-3)
losses = train(d, g, gan, noise_d, input_dim, X_train, 
               batch_size=256, n_epochs=1000, add_epoch=120)
    
In [ ]:
    
# plot random generated image
plt.imshow(g.predict(np.random.randn(input_dim).reshape(1, -1))
           .reshape(28, 28))
plt.show()
    
In [ ]:
    
plt.imshow(true_sample[2].reshape(28, 28))
plt.show()
    
In [ ]:
    
gan.test_on_batch(noise, y)
    
In [ ]:
    
gan.train_on_batch(noise, y)
    
In [ ]:
    
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from keras.initializers import *
from keras.callbacks import *
from keras.utils.generic_utils import Progbar
    
In [ ]:
    
noise_d = NoiseDistribution()
    
In [ ]:
    
input_dim = 100
img_shape = (28,28,1)
num_classes = 10
    
In [ ]:
    
# utility for the standard deconvolution block used in the generator
def generator_deconv_block(filters, block_input, kernel_size=(3, 3), strides=(1, 1)):
    block = UpSampling2D()(block_input)
    block = Convolution2D(filters, (3, 3), strides=strides, padding='same')(block)
    block = LeakyReLU()(block)
    block = BatchNormalization()(block)
    return block
    
In [ ]:
    
# different from basic DCGAN, this WGAN model
# takes as input both the prior sample (noise) and the image class
def generator_model(input_dim, voc_size, init_filters=128, init_side=7, num_deconv_blocks=2):
    # Input combination part
    input_class = Input(shape=(1, ), dtype='int32')
    e = Embedding(voc_size, input_dim)(input_class)
    embedded_class = Flatten()(e)
    # noise
    noise = Input(shape=(input_dim, ))
    # hadamard product
    h = multiply([noise, embedded_class])
    
    # CNN part
    x = Dense(1024)(h)
    x = LeakyReLU()(x)
    
    x = Dense(init_side*init_side*init_filters)(x)
    x = LeakyReLU()(x)
    x = BatchNormalization()(x)
    x = Reshape((init_side, init_side, init_filters))(x)
    for i in range(num_deconv_blocks):
        x = generator_deconv_block(init_filters//(2**(i+1)), block_input=x, kernel_size=(5, 5))
    x = Convolution2D(1, (2, 2), padding='same', activation='tanh')(x)
    
    return Model(inputs=[noise, input_class], outputs=x)
    
In [ ]:
    
# instantiate generate model
gen = generator_model(input_dim=input_dim, voc_size=10, init_filters=128)
gen.summary()
    
In [ ]:
    
# plot random generated image
plt.imshow(gen.predict([noise_d.sample((1, input_dim)), np.array([7])])[0]
           .reshape(28, 28))
plt.show()
    
In [ ]:
    
# utility for the standard convolution block used in the discriminator
def discriminator_conv_block(filters, block_input, kernel_size=(3, 3), strides=(1, 1), pool_size=None):
    block = Convolution2D(filters, kernel_size, strides=strides, padding='same')(block_input)
    block = LeakyReLU()(block)
    block = BatchNormalization()(block)
    
    # if given, add max pooling
    if pool_size:
        block = MaxPool2D(pool_size=pool_size)(block)
        
    return block
    
In [ ]:
    
# different from basic DCGAN, this WGAN discriminator model
# takes an image as input, and output both a prediction about image autheticity
# as well as one for the image class
def discriminator_model(input_shape, num_classes, init_filters=32, num_conv_blocks=3):
    input_image = Input(shape=input_shape)
    
    x = input_image
    for i in range(num_conv_blocks):
        x = discriminator_conv_block(init_filters*(2**i), block_input=x, pool_size=None)
    features = Flatten()(x)
    
    out_autheticity = Dense(1, activation='linear')(features)
    out_class = Dense(num_classes, activation='softmax')(features)
    
    return Model(inputs=[input_image], outputs=[out_autheticity, out_class])
    
In [ ]:
    
# instantiate discriminator model
dis = discriminator_model(input_shape=img_shape, num_classes=10, init_filters=32)
dis.summary()
    
In [ ]:
    
# print prediction for random image
dis.predict(gen.predict([noise_d.sample((1, input_dim)), np.array([3])]))
    
In [ ]:
    
# loss function for discriminator
def d_loss(y_true, y_pred):
    return K.mean(y_true * y_pred)
    
In [ ]:
    
# init GAN components
gen = generator_model(input_dim=input_dim, voc_size=num_classes, init_filters=128)
dis = discriminator_model(input_shape=img_shape, num_classes=num_classes, init_filters=32)
# compile discriminator
dis.compile(loss=[d_loss, 'sparse_categorical_crossentropy'], 
            optimizer=RMSprop(lr=1e-4))
    
In [ ]:
    
# Build adversarial model
noise = Input(shape=(input_dim, ))
input_class = Input(shape=(1, ), dtype='int32')
out_autheticity, out_class = dis(gen(inputs=[noise, input_class]))
gan = Model(inputs=[noise, input_class], outputs=[out_autheticity, out_class])
gan.compile(loss=[d_loss, 'sparse_categorical_crossentropy'], 
            optimizer=RMSprop(lr=1e-4))
    
In [ ]:
    
gan.summary()
    
In [ ]:
    
# given that generator uses tanh activation function, 
# we need to process its output to make it a valid image
#deprocess = lambda img : np.transpose((img/2+0.5).clip(0,1), (1,2,0))
deprocess = lambda img : (img/2+0.5).clip(0,1)
    
In [ ]:
    
generator_fun = lambda num_samples : deprocess(gen.predict([noise_d.sample((num_samples, input_dim)),
                               np.random.randint(0, num_classes, num_samples)]))
    
In [ ]:
    
# load mnist data using Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# normalize to -1..1 range and reshape
X_train = (X_train.astype(np.float32) - 127.5) / 127.5
X_train = np.expand_dims(X_train, axis=-1)
print(X_train.shape)
print(y_train.shape)
    
In [ ]:
    
def train_discriminator(dis, gen, noise_d, input_dim, num_classes, X_train, Y_train, batch_size, epoch):
    # clip weights
    for l in dis.layers:
        weights = l.get_weights()
        weights = [np.clip(w, -0.01, 0.01) for w in weights]
        l.set_weights(weights)
    
    # generate data
    # first we sample from the true distribution (mnist dataset), then we generate some
    # "fake" images by feeding noise to the generator
    
    # generate random indexes for train data
    batch_idxs = np.random.randint(0, len(X_train), batch_size)
    # collect images corresponsing to previously generated index, and add a dimension
    true_sample = X_train[batch_idxs]
    true_sample_classes = y_train[batch_idxs]
    
    # train on true samples
    dis_true_loss = dis.train_on_batch(true_sample, 
                                       [-np.ones(batch_size), true_sample_classes])
    
    # generate fake sample
    noise = noise_d.sample((batch_size, input_dim))
    generated_classes = np.random.randint(0, num_classes, batch_size)
    fake_sample = gen.predict([noise, generated_classes.reshape(-1, 1)])
    
    # train on fake samples
    dis_fake_loss = dis.train_on_batch(fake_sample, 
                                   [np.ones(batch_size), generated_classes])
    #print("Epoch {}: [D loss: {}]".format(epoch, d_loss))
    return dis_true_loss, dis_fake_loss
    
In [ ]:
    
def train(dis, gen, gan, noise_d, input_dim, num_classes, X_train, Y_train,
          batch_size=32, n_epochs=1, add_epochs=0):
    losses = {'gan':[], 'dis_fake_loss':[], 'dis_true_loss':[]}
    for epoch in tqdm(range(n_epochs), desc='Training GAN'):
        if (epoch+add_epochs % 1000) < 15 or epoch+add_epochs % 500 == 0: # 25 times in 1000, every 500th
            d_iters = 40
        else:
            d_iters = 5#D_ITERS
        
        # train discriminator
        set_trainable(dis, True, None, None)
        for d_epoch in range(d_iters):
            dis_true_loss, dis_fake_loss = train_discriminator(dis, gen, noise_d, input_dim, num_classes, 
                                         X_train, Y_train, batch_size, epoch)
            losses['dis_fake_loss'].append(dis_fake_loss)
            losses['dis_true_loss'].append(dis_true_loss)
        
        set_trainable(dis, False, None, None)
        # train GAN
        # feed noise to the model and expect true (1) response from discriminator,
        # which is in turn fed with data generated by the generator
        noise = noise_d.sample((batch_size, input_dim))
        generated_classes = np.random.randint(0, num_classes, batch_size)
        gan_loss = gan.train_on_batch(
            [noise, generated_classes.reshape((-1, 1))], 
            [-np.ones(batch_size), generated_classes])
        losses['gan'].append(gan_loss)
        #print("Epoch {}: [G loss: {}]".format(epoch, g_loss))
        if epoch%10 == 0:
            plot_sample_imgs(generator_fun, img_shape[:2], 
                             savepath=os.path.join(RES_DIR, 'data', 'mnist_wgan', 'mnist_gen{}.jpg'.format(epoch+add_epochs)))
    return losses
    
In [ ]:
    
add_epochs = 0
    
In [ ]:
    
#plt.ion()
plt.ioff()
n_epochs = 500
losses = train(dis, gen, gan, noise_d, input_dim, num_classes, 
               X_train, y_train, 
               batch_size=64, n_epochs=n_epochs, add_epochs=add_epochs)
add_epochs += n_epochs
    
In [ ]:
    
def plot_losses(losses):
    f = plt.figure()
    #plt.plot(losses['dis_fake_loss'], label='dis_fake_loss')
    #plt.plot(losses['dis_fake_loss'], label='dis_true_loss')
    plt.plot(np.array(losses['gan'])[:,2], label='gan loss')
    plt.legend()
    plt.show()
    
In [ ]:
    
plot_losses(losses)
    
In [ ]:
    
np.array(losses['gan'])[:,0]
    
In [ ]:
    
import imageio
import os
RES_DIR = os.path.join(os.pardir, os.pardir, 'data', 'deep_learning')
dir_path = os.path.join(RES_DIR, 'data', 'mnist_wgan', 'test_2')
filenames = [(os.path.join(dir_path, filename), int(filename[9:-4])) for filename in os.listdir(dir_path)]
images = []
for filename in sorted(filenames, key=lambda x:x[1]):
    images.append(imageio.imread(filename[0]))
imageio.mimsave(os.path.join(RES_DIR, 'data', 'mnist_wgan', 'wgan.gif'), images)
    
In [ ]: